-
-
Notifications
You must be signed in to change notification settings - Fork 315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add broadcast functionality from triggers #1156
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
af3caf2
to
6fbe933
Compare
lib/realtime/tenants/repo/migrations/20240917170412_add_payload_to_messages.ex
Outdated
Show resolved
Hide resolved
b346ad0
to
b27bf56
Compare
lib/realtime/tenants/repo/migrations/20240917170412_add_payload_to_messages.ex
Outdated
Show resolved
Hide resolved
b27bf56
to
ef0d4ca
Compare
Should our payload match what we have here in the Webhooks trigger? |
9ab5171
to
0e27113
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wouldn't hurt to have a check like
IF TG_LEVEL = 'STATEMENT' THEN
RAISE EXCEPTION 'realtime.broadcast_changes should be triggered for each row, not for each statement';
END IF;
to make sure no one tries to "optimize" their trigger for bulk operations
lib/realtime/tenants/repo/migrations/20240917170412_add_payload_to_messages.ex
Outdated
Show resolved
Hide resolved
lib/realtime/tenants/repo/migrations/20240917170412_add_payload_to_messages.ex
Outdated
Show resolved
Hide resolved
Adds a new functionality to broadcast db changes. It adds: * New function to broadcast from postgres using triggers
bd3075e
to
25a965b
Compare
ad4ac2b
to
122e4a4
Compare
122e4a4
to
7dc2e04
Compare
7bc88ec
to
63aa048
Compare
63aa048
to
54860fd
Compare
Hi @filipecabaco! This looks exciting! I have two questions: Today you start a single replication process, but as far as I understand, it has a single handler, and the handler only looks into a single key in the registry. In this case, what value is the registry buying? Is it in case someone reconnects, you get will get a new entry in the registry and dispatch to that? And what happens if multiple clients are trying to subscribe to the same replication? Also, should the replication connection eventually terminate if no one is connected? |
Hi @josevalim and thank you! We actually have multiple handlers, one per tenant and the same happens for PosgtresReplication modules. Meaning that we need to register one process of Handler and one process of PostgresReplication per tenant on connect. Assumptions:
So the flow is as follows:
On the PostgresReplication / Handler:
Termination:
|
Thank you, that makes complete sense! The existing connection is already responsible for setting up the concurrency and termination. With this in mind, I think you could simplify the current implementation? For example, you might not need the registry for handlers: instead, a handler starts the replication and pass You probably don't need a registry for the postgrex replication either? Each connection knows the replication pid and the handler will be directly messaged. WDYT? There is even a question if you should have both a handler and a listener and if they could be the same process, but we can discuss this later. :) |
Makes sense! Will work on removing the registries from both of them and will actually move the broadcast into PostgresReplication. I did this heavily based on a quick lib I did to learn more about replication ( github.com/filipecabaco/postgres_replication ) and kept that format but for this specific scenario it's just adding more complexity without gains. Will rework this today / tomorrow. Thank you for the feedback 🙏 |
@josevalim did the change 👍 feels way simpler and less error prone. Still needed a Registry due to the fact that the tests were creating multiple entries and ended up clashing. There might be some really heavy improvements to be done on the Realtime.Tenants.Connect module to simplify this. |
Fantastic! Final question: how would they clash? Where they named before? If so, why did they have to be named? |
They were named before yes. When we do I do suspect that it's due to the way |
The tenant should store in it its state if it has a replication connection or not, so you should not have dupes, right? Unless you want to access this information externally, from another process, and you don't want to ask the tenant for the PID every time. |
I was saving the pid as the way to signal if there was a replication connection but might have fumbled something. will check the code again to see if i can get that name registration removed |
buffer = Enum.reverse(buffer) | ||
tenant = Cache.get_tenant_by_external_id(tenant_id) | ||
|
||
case BatchBroadcast.broadcast(nil, tenant, %{messages: buffer}, true) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're building up records and then broadcasting on commit?
I think there could be a LOT of changes before the commit message.
Could oom Realtime, and also cause significant delays in clients receiving messages.
If a change hits the wal we should just send it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no strong opinion honestly. it would be less logic so I would be ok with that.
since this is in our side to control, all inserts into realtime.messages should be quite "sane" and probably won't hit any issues
lib/realtime/tenants/repo/migrations/20240919163305_change_messages_id_type.ex
Outdated
Show resolved
Hide resolved
f4c2c19
to
a7f3d6a
Compare
a7f3d6a
to
cebe0c1
Compare
🎉 This PR is included in version 2.33.0 🎉 The release is available on GitHub release Your semantic-release bot 📦🚀 |
What kind of change does this PR introduce?
Adds a new functionality to broadcast db changes. It adds:
uuid
column torealtime.messages
so we can later change all ids from serial id to uuid formatrealtime.send
postgres function to broadcast messagesrealtime.broadcast_changes
postgres function to broadcast broadcast changes specifically and be used in trigger functionsrealtime.messages
Example: